package com.jhf.youke.flink.domain.service;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;

/**
* @Description:
* @Param:
* @return:
* @Author: RHJ
* @Date: 2022/11/11
*/
@Component
public class StreamSetService {

    private StreamExecutionEnvironment  env;

    public StreamSetService(){
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    }

    public void wordCount() throws Exception{

            // 使用windows 子系统中 nc -lk 777 发送数据模拟
            DataStream<String> inputSet = env.socketTextStream("localhost",777);

            DataStream<Tuple2<String,Integer>> res = inputSet.flatMap(new WordFlatMapFunction())
            .keyBy(0)
            .sum(1);


//        DataStream<Person> flintstones = env.fromElements(
//                new Person("Fred", 35),
//                new Person("Wilma", 35),
//                new Person("Pebbles", 2));
//
//        DataStream<Person> res = flintstones.filter(new FilterFunction<Person>() {
//            @Override
//            public boolean filter(Person person) throws Exception {
//                return person.age >= 18;
//            }
//        });

        res.print();
        this.env.execute();

    }


    public class WordFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String str, Collector<Tuple2<String,Integer>> collector) throws Exception {
            String[] words = str.split(" ");
            for(String word : words){
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }

    public static void main(String[] args) throws Exception{
        StreamSetService streamSetService = new StreamSetService();
        streamSetService.wordCount();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }
        @Override
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }


}
